/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.druid.query.groupby;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.databind.type.TypeFactory;
import org.apache.druid.data.input.Row;
import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.NullableTypeStrategy;
import org.apache.druid.segment.column.ValueType;

import javax.annotation.Nullable;
import java.io.IOException;
import java.util.List;

/**
 * Utility class for conditional serde of {@link ResultRow} objects. Depending on the query configuration and the query
 * dimensions, this class chooses an optimally performant method for serdeing the result rows while also preserving the
 * dimension classes.
 * Any modification this class must be benchmarked properly as it runs in a hot-loop and can have significant impact on
 * long-running queries. See {@code GroupByDeserializationBenchmark} for existing benchmarks
 */
public class ResultRowObjectMapperDecoratorUtil
{
  /**
   * Decorates the provided object mapper so that it can read the result rows generated by the given query and the
   * groupByQueryConfig. It never modifies the provided object mapper. It can either return the same mapper undecorated,
   * or clones the object mapper before decorating it.
   */
  public static ObjectMapper decorateObjectMapper(
      final ObjectMapper baseObjectMapper,
      final GroupByQuery query,
      final GroupByQueryConfig groupByQueryConfig
  )
  {
    final JsonDeserializer<ResultRow> deserializer = getDeserializer(baseObjectMapper, query, groupByQueryConfig);
    final JsonSerializer<ResultRow> serializer = getSerializer(query, groupByQueryConfig);
    if (deserializer == null && serializer == null) {
      return baseObjectMapper;
    }

    final ObjectMapper decoratedObjectMapper = baseObjectMapper.copy();
    class GroupByResultRowModule extends SimpleModule
    {
      private GroupByResultRowModule()
      {
        if (serializer != null) {
          addSerializer(ResultRow.class, serializer);
        }
        if (deserializer != null) {
          addDeserializer(ResultRow.class, deserializer);
        }
      }
    }
    decoratedObjectMapper.registerModule(new GroupByResultRowModule());
    return decoratedObjectMapper;
  }

  /**
   * Returns a deserializer required to for the result rows of the provided query. It returns null if no special
   * deserialization is required, and type-unaware generic java objects are sufficient.
   */
  @Nullable
  private static JsonDeserializer<ResultRow> getDeserializer(
      final ObjectMapper objectMapper,
      final GroupByQuery query,
      final GroupByQueryConfig groupByQueryConfig
  )
  {
    final boolean resultAsArray = query.context().getBoolean(GroupByQueryConfig.CTX_KEY_ARRAY_RESULT_ROWS, false);
    final boolean intermediateCompatMode = groupByQueryConfig.isIntermediateResultAsMapCompat();
    final boolean arrayBasedRows = resultAsArray && !intermediateCompatMode;
    final boolean dimensionsRequireConversion = query.getDimensions()
                                                     .stream()
                                                     .anyMatch(
                                                         dimensionSpec -> dimensionRequiresConversion(dimensionSpec.getOutputType())
                                                     );

    // Most common case - when array based rows are used, and grouping is done on primitive/array/json types
    if (arrayBasedRows && !dimensionsRequireConversion) {
      // We can assume ResultRow are serialized and deserialized as arrays. No need for special decoration,
      // and we can save the overhead of making a copy of the ObjectMapper
      return null;
    } else if (!arrayBasedRows && !dimensionsRequireConversion) {
      // We have to deserialize map based rows, however we don't have to deserialize the dimensions individually
      // Returns a deserializer that can deserialize both map and array based rows simultaneously
      return new JsonDeserializer<ResultRow>()
      {
        @Override
        public ResultRow deserialize(final JsonParser jp, final DeserializationContext ctxt) throws IOException
        {
          if (jp.isExpectedStartObjectToken()) {
            final Row row = jp.readValueAs(Row.class);
            return ResultRow.fromLegacyRow(row, query);
          } else {
            return ResultRow.of(jp.readValueAs(Object[].class));
          }
        }
      };

    } else {
      // Dimensions need to be serialized individually because some of them require conversion to specialized types
      return new JsonDeserializer<ResultRow>()
      {
        final JavaType[] javaTypes = createJavaTypesForResultRow(query);

        @Override
        public ResultRow deserialize(final JsonParser jp, final DeserializationContext ctxt) throws IOException
        {
          if (jp.isExpectedStartObjectToken()) {
            final Row row = jp.readValueAs(Row.class);
            final ResultRow resultRow = ResultRow.fromLegacyRow(row, query);

            final List<DimensionSpec> queryDimensions = query.getDimensions();
            for (int i = 0; i < queryDimensions.size(); ++i) {
              if (dimensionRequiresConversion(queryDimensions.get(i).getOutputType())) {
                final int dimensionIndexInResultRow = query.getResultRowDimensionStart() + i;
                resultRow.set(
                    dimensionIndexInResultRow,
                    objectMapper.convertValue(
                        resultRow.get(dimensionIndexInResultRow),
                        javaTypes[dimensionIndexInResultRow]
                    )
                );
              }
            }

            return resultRow;
          } else {
            if (!jp.isExpectedStartArrayToken()) {
              throw DruidException.defensive("Expected start token, received [%s]", jp.currentToken());
            }

            Object[] objectArray = new Object[query.getResultRowSizeWithPostAggregators()];
            int index = 0;

            while (jp.nextToken() != JsonToken.END_ARRAY) {
              objectArray[index] = JacksonUtils.readObjectUsingDeserializationContext(jp, ctxt, javaTypes[index]);
              ++index;
            }

            return ResultRow.of(objectArray);
          }
        }
      };
    }
  }

  /**
   * Returns a legacy mode aware serialiazer that serializes the result rows as arrays or maps depending on the query
   * configuration
   */
  @Nullable
  private static JsonSerializer<ResultRow> getSerializer(
      final GroupByQuery query,
      final GroupByQueryConfig groupByQueryConfig
  )
  {
    final boolean resultAsArray = query.context().getBoolean(GroupByQueryConfig.CTX_KEY_ARRAY_RESULT_ROWS, false);
    final boolean intermediateCompatMode = groupByQueryConfig.isIntermediateResultAsMapCompat();
    final boolean arrayBasedRows = resultAsArray && !intermediateCompatMode;
    if (arrayBasedRows) {
      return null;
    } else {
      if (resultAsArray) {
        return new JsonSerializer<ResultRow>()
        {
          @Override
          public void serialize(ResultRow resultRow, JsonGenerator jsonGenerator, SerializerProvider serializerProvider)
              throws IOException
          {
            JacksonUtils.writeObjectUsingSerializerProvider(jsonGenerator, serializerProvider, resultRow.getArray());
          }
        };

      } else {
        return new JsonSerializer<ResultRow>()
        {
          @Override
          public void serialize(ResultRow resultRow, JsonGenerator jsonGenerator, SerializerProvider serializerProvider)
              throws IOException
          {
            JacksonUtils.writeObjectUsingSerializerProvider(
                jsonGenerator,
                serializerProvider,
                resultRow.toMapBasedRow(query)
            );
          }
        };
      }
    }
  }

  /**
   * Returns true if the dimension needs to be converted from generic Java objects to the specialized column type. It involves all
   * complex types, except for JSON types. JSON types are special in a way that they can work with the generic java objects
   * without any conversion
   */
  private static boolean dimensionRequiresConversion(final ColumnType dimensionType)
  {
    return dimensionType.is(ValueType.COMPLEX) && !ColumnType.NESTED_DATA.equals(dimensionType);
  }

  /**
   * Creates java types for deserializing the result row. For timestamp, aggregators and post-aggregators, it resorts to
   * {@code Object.class}. For dimensions requiring conversion (check {@link #dimensionRequiresConversion(ColumnType)}),
   * it returns the java type for the associated class of the complex object.
   */
  private static JavaType[] createJavaTypesForResultRow(final GroupByQuery groupByQuery)
  {
    final TypeFactory typeFactory = TypeFactory.defaultInstance();
    final JavaType[] javaTypes = new JavaType[groupByQuery.getResultRowSizeWithPostAggregators()];
    final List<DimensionSpec> dimensions = groupByQuery.getDimensions();
    for (int i = 0; i < groupByQuery.getResultRowSizeWithPostAggregators(); ++i) {
      if (i >= groupByQuery.getResultRowDimensionStart() && i < groupByQuery.getResultRowAggregatorStart()) {
        DimensionSpec dimension = dimensions.get(i - groupByQuery.getResultRowDimensionStart());
        ColumnType dimensionType = dimensions.get(i - groupByQuery.getResultRowDimensionStart()).getOutputType();
        if (dimensionType.is(ValueType.COMPLEX)) {
          //noinspection rawtypes
          NullableTypeStrategy nullableTypeStrategy = dimensionType.getNullableStrategy();
          if (!nullableTypeStrategy.groupable()) {
            throw DruidException.defensive(
                "Ungroupable dimension [%s] with type [%s] found in the query.",
                dimension,
                dimensionType
            );
          }
          javaTypes[i] = typeFactory.constructType(nullableTypeStrategy.getClazz());
        } else {
          javaTypes[i] = typeFactory.constructType(Object.class);
        }
      } else {
        javaTypes[i] = typeFactory.constructType(Object.class);
      }
    }
    return javaTypes;
  }
}
